[dbt] データウェアハウスにロードされているローデータを「sources」として定義する(と便利だよ)
大阪オフィスの玉井です。
dbtはELTの「T」を行うツールです。つまり、DWHに対して、既にEとLが終わっている…言い換えると、DWHに対して、既にデータがロード済であることが(dbtを使用する上で)前提となっています。
ELTは、(見ればわかりますが)Tより先にLがあります。ローデータをゴニョゴニョ変換する前に、まずはそのままDWHにロードする、ということです。つまり、DWHにロード済のローデータが既にあることが、dbtを使用する上での前提になります。
dbtでは、この「DWHにロードされているローデータ」をsources
として定義することができます。そして定義したsources
は、色々な形で利用することができます。今回は、そのsources
を実際に使ってみました。
検証環境
- macOS Catalina
- dbt CLI 0.18.1
- Google BigQuery
やってみた
sources
をプロジェクト上で定義する
DWHに下記のようなロード済のローデータがあったとします(ここから色々変換して、データマート用のテーブルとかを作りたいとする)。
こちらをdbtプロジェクト上でsources
として定義したいと思います。
sources
を定義する
dbt側でsourcesを定義するには、Modelディレクトリ下のYAMLファイルに下記のように記述します。
sources: - name: jaffle database: tamai-rei tables: - name: raw_customer - name: raw_order
ざっくり言うと、database
はDB名、tables
はテーブル名を定義します。スキーマ名(BQの場合はデータセット名)はname
で定義した名前がそのまま使われます。しかし、schema
というパラメータでスキーマ名を明示的に定義することもできます。
定義したsources
を他の場所から参照する
定義したsources
は、modelファイル内等で参照することができます。
select id as order_id, user_id as customer_id, order_date, status from {{ source('jaffle','raw_order') }}
DWH側を確認する
sources
を定義して、modelからsources
を参照する形をとった状態で、dbtを実行するとどうなるのでしょうか。dbt run
を実施します。
modelファイル側のFROM句は、sources
を参照するJinjaコードになっていましたが、DWH上では、sources
で定義したローデータのテーブルを正しく参照するクエリとしてコンパイルされています。
sourcesをテストする
dbtは作成したmodelに対してテストを設定することができます。詳細は下記をどうぞ。
このテストは、sources
にも設定することができます。sources
で定義するのは、DWHにロードされてきたローデータなので、実質、ELTのLで入ってくるデータに対してテストを行うことができる、ということになります。
テストの書き方はmodelに対するものと同じです。
sources: - name: jaffle database: tamai-rei tables: - name: raw_customer columns: - name: id tests: - unique - not_null - name: raw_order columns: - name: id tests: - unique - not_null
実際にテストを実行します。
$ dbt test Running with dbt=0.18.1 Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources 19:40:12 | Concurrency: 4 threads (target='learn') 19:40:12 | 19:40:12 | 1 of 11 START test accepted_values_stg_orders_status__completed__shipped__returned__placed__return_pending [RUN] 19:40:12 | 2 of 11 START test assert_positive_value_for_total_amount............ [RUN] 19:40:12 | 3 of 11 START test not_null_stg_customers_customer_id................ [RUN] 19:40:12 | 4 of 11 START test not_null_stg_orders_order_id...................... [RUN] 19:40:14 | 4 of 11 PASS not_null_stg_orders_order_id............................ [PASS in 2.54s] 19:40:14 | 5 of 11 START test relationships_stg_orders_customer_id__customer_id__ref_stg_customers_ [RUN] 19:40:14 | 3 of 11 PASS not_null_stg_customers_customer_id...................... [PASS in 2.64s] 19:40:14 | 6 of 11 START test source_not_null_jaffle_raw_customer_id............ [RUN] 19:40:15 | 2 of 11 PASS assert_positive_value_for_total_amount.................. [PASS in 3.14s] 19:40:15 | 7 of 11 START test source_not_null_jaffle_raw_order_id............... [RUN] ...
sources
として定義したテーブルに対してテストクエリが実行されています。
sourcesをドキュメントに反映させる
dbtは、作成したデータモデルに関するドキュメントを自動生成することができます。詳細は下記をどうぞ。
当然、sources
もドキュメントに含めることができます。これも、定義の仕方は従来と同じです。
sources: - name: jaffle_shop description: "BQに存在するパブリックデータからCTASして生成" database: dbt-tutorial tables: - name: customers description: "顧客データ(raw)" columns: - name: id tests: - unique - not_null - name: orders description: "注文データ(raw)" columns: - name: id tests: - unique - not_null
ドキュメントにsources
のdescription
が表示されています。
もちろん、DAGにも反映されます。
sources
の「鮮度」をチェックする
DWHにロードするデータは様々ですが、定期的に新しいデータがロードされ続けるテーブルが存在するケースもあります。そういうケースの場合、sources
に定義しているテーブルのデータの「鮮度」をdbtでチェックすることができます。
下記のように記述します。
sources: - name: jaffle description: "BQに存在するパブリックデータからCTASして生成" database: tamai-rei tables: - name: raw_customer description: "顧客データ(raw)" columns: - name: id tests: - unique - not_null - name: raw_order loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} description: "注文データ(raw)" columns: - name: id tests: - unique - not_null
raw_order
に新しいパラメータを追加しています。
loaded_at_field
は、鮮度の計算に使用するための時間データが入ったカラムを指定します。(後々クエリを見ればわかりますが)dbtはこのカラムのMAX値をとり、現在時刻との差を計ることで、データの鮮度をチェックします。ちなみに、timestampかつUTCじゃないとダメなので、そうじゃない場合はこの部分でキャストをかけるクエリを記述する必要があります。(loaded_at_field: "convert_timezone('UTC', 'Asia/Tokyo', _etl_loaded_at)
など)。
freshness
以降で、「現在時刻とデータの時刻がこれだけ離れていたら警告/エラーを出す」という定義をします。例えば、warn_after: {count: 12, period: hour}
は「最新データが現在時刻より12時間以上離れていたら警告を出す」という感じです。
この状態で、dbt source snapshot-freshness
を実行すると、sources
の鮮度を計ることができます。
$ dbt source snapshot-freshness Running with dbt=0.18.1 Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources 20:35:05 | Concurrency: 4 threads (target='learn') 20:35:05 | 20:35:06 | 1 of 1 START freshness of jaffle.raw_order........................... [RUN] 20:35:09 | 1 of 1 WARN freshness of jaffle.raw_order............................ [WARN in 3.98s] 20:35:09 | Done.
WARN
が出たので、このデータは現在時刻(コマンド実行時刻)から12時間以上経過している、ということになります。
DWH側では、下記のクエリが実行されます。
単発で実行するというよりは、ジョブとして定期的に実行し続けて、データの鮮度に問題ないかどうかウォッチする、という使い方が想定されます。
おわりに
dbtがいかに「ELT」を意識したツールになっているかということをひしひしと感じました。まずロードありきということで、そのままロードしたデータがDWHにある前提で、そのローデータを「sources」として定義し、そこから各種データモデルに派生させていく、という思想がツール越しに伝わってきました。要するに、「sources」として定義するべきデータがDWHに無いという場合、それはELTのEとLが正しく完了していないということになります。dbtを使用する = ELTのEとLを正しく実施する(整備する)…ということになるでしょう。